from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType,DoubleType
import pandas as pd
from pyspark.sql import functions as F

# 4每一个省达日销1000的店铺数量(wan)
if __name__ == '__main__':
    spark = SparkSession.builder. \
        appName("xixi4494"). \
        getOrCreate()
    sc = spark.sparkContext

    # JSON类型自带有Schema信息
    df = spark.read.format("json").load("hdfs://node1:8020/input/mini.json")
    df.groupBy("storeProvince", "storeID",
               F.from_unixtime(df['dateTS'].substr(0, 10), "yyyy-MM-dd").alias("day")). \
        sum("receivable").withColumnRenamed("sum(receivable)", "money"). \
        filter("money > 1000"). \
        dropDuplicates(subset=["storeID"]). \
        groupBy("storeProvince").count().write.mode("overwrite"). \
        format("jdbc"). \
        option("url", "jdbc:mysql://node1:3306/bigdata?useSSL=false&useUnicode=true"). \
        option("dbtable", "month_rank"). \
        option("user", "root"). \
        option("password", "123456"). \
        save()